#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_threadpool.hpp"
#include "mq_consumer.hpp"
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unordered_map>
namespace bitmq
{
        typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
        using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
        using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;
        using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;
        class Channel
        {
        public:
                using ptr = std::shared_ptr<Channel>;
                Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
                    : _conn(conn), _codec(codec), _cid(UUIDHelper::uuid())
                {
                }
                ~Channel()
                {
                        basicCancel(); // 消费者取消订阅
                }
                std::string cid() { return _cid; }
                // 发送创建信道的请求
                bool openChannel()
                {
                        openChannelRequest req;
                        std::string rid = UUIDHelper::uuid();
                        req.set_rid(rid);
                        req.set_cid(_cid);
                        _codec->send(_conn, req);
                        // 等待服务器的响应
                        basicCommonResponsePtr resp = waitResponse(rid);
                        return resp->ok();
                }
                // 发送关闭信道的请求
                void closeChannel()
                {
                        closeChannelRequest req;
                        std::string rid = UUIDHelper::uuid();
                        req.set_rid(rid);
                        req.set_cid(_cid);
                        _codec->send(_conn, req);
                        // 等待服务器的响应
                        basicCommonResponsePtr resp = waitResponse(rid);
                        return;
                }
                // 交换机
                bool declareExchange(const std::string &name,
                                     ExchangeType type, bool durable, bool auto_delete,
                                     google::protobuf::Map<std::string, std::string> &args)
                {
                        // 构造一个声明虚拟机的请求对象
                        declareExchangeRequest req;
                        std::string rid = UUIDHelper::uuid();
                        req.set_rid(rid);  // 请求ID
                        req.set_cid(_cid); // 信道ID
                        req.set_exchange_name(name);
                        req.set_exchange_type(type);
                        req.set_auto_delete(auto_delete);
                        req.mutable_args()->swap(args);
                        // 然后向服务器发送请求
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                        // 等待服务器的响应
                        basicCommonResponsePtr resp = waitResponse(rid);
                        // 返回

                        return resp->ok();
                }

                void deleteExchange(const std::string &name)
                {
                        std::string rid = UUIDHelper::uuid();
                        deleteExchangeRequest req;
                        req.set_rid(rid);
                        req.set_exchange_name(name);
                        req.set_cid(_cid);
                        // 然后向服务器发送请求
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        waitResponse(rid);
                        // 返回
                        return;
                }
                // 队列
                bool declareQueue(const std::string &qname,
                                  bool qdurable, bool qexclusive, bool qauto_delete,
                                  google::protobuf::Map<std::string, std::string> &qargs)
                {
                        std::string rid = UUIDHelper::uuid();
                        declareQueueRequest req;
                        req.set_rid(rid);
                        req.set_cid(_cid);
                        req.set_queue_name(qname);
                        req.set_durable(qdurable);
                        req.set_auto_delete(qauto_delete);
                        req.set_exclusive(qexclusive);
                        req.mutable_args()->swap(qargs);

                        _codec->send(_conn, req);
                        basicCommonResponsePtr resp = waitResponse(rid);
                        return resp->ok();
                }
                void deleteQueue(const std::string &name)
                {
                        std::string rid = UUIDHelper::uuid();
                        deleteQueueRequest req;
                        req.set_rid(rid);
                        req.set_queue_name(name);
                        req.set_cid(_cid);
                        // 然后向服务器发送请求
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        waitResponse(rid);
                        // 返回
                        return;
                }
                // 绑定
                bool queueBind(const std::string &ename, const std::string &qname, const std::string &key)
                {
                        std::string rid = UUIDHelper::uuid();
                        queueBindingRequest req;
                        req.set_rid(rid);
                        req.set_queue_name(qname);
                        req.set_exchange_name(ename);
                        req.set_cid(_cid);
                        req.set_binding_key(key);
                        // 然后向服务器发送请求
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        basicCommonResponsePtr resp = waitResponse(rid);
                        // 返回
                        return resp->ok();
                }
                void queueUnBind(const std::string &ename, const std::string &qname)
                {
                        std::string rid = UUIDHelper::uuid();
                        queueUnBindingRequest req;
                        req.set_rid(rid);
                        req.set_queue_name(qname);
                        req.set_exchange_name(ename);
                        req.set_cid(_cid);
                        // 然后向服务器发送请求
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        waitResponse(rid);
                        // 返回
                        return;
                }

                // 消息的发布
                void basicPublish(const std::string &ename, const BasicProperties *bp, const std::string &body)
                {
                        std::string rid = UUIDHelper::uuid();
                        basicPublishRequest req;
                        req.set_rid(rid);
                        req.set_exchange_name(ename);
                        req.set_cid(_cid);
                        req.set_body(body);
                        if (bp != nullptr)
                        {
                                req.mutable_properties()->set_id(bp->id());
                                req.mutable_properties()->set_delivery_mode(bp->delivery_mode());
                                req.mutable_properties()->set_routing_key(bp->routing_key());
                        }
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        waitResponse(rid);
                }
                // 消息的确认
                void basicAck(const std::string &msgid)
                {
                        if (_consumer.get() == nullptr)
                        {
                                DLOG("消息确认时,找不到消费者信息！");
                                return;
                        }
                        std::string rid = UUIDHelper::uuid();
                        basicAckRequest req;
                        req.set_rid(rid);
                        req.set_queue_name(_consumer->qname);
                        req.set_cid(_cid);
                        req.set_message_id(msgid);
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        waitResponse(rid);
                        return;
                }
                // 取消订阅
                void basicCancel()
                {
                        if (_consumer.get() == nullptr)
                        {
                                DLOG("取消订阅的时候，找不到消费者信息!");
                                return;
                        }
                        std::string rid = UUIDHelper::uuid();
                        basicCancelRequest req;
                        req.set_rid(rid);
                        req.set_cid(_cid);
                        req.set_consumer_tag(_consumer->tag);
                        req.set_queue_name(_consumer->qname);
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        _consumer.reset();
                        waitResponse(rid);
                        return;
                }
                // 获取订阅的消息
                bool basicConsume(
                    const std::string &consumer_tag,
                    const std::string &queue_name,
                    bool auto_ack,
                    const ConsumerCallback &cb)
                {
                        if (_consumer.get() != nullptr)
                        {
                                DLOG("当前消费者已经订阅过消息队列！");
                                return false;
                        }
                        std::string rid = UUIDHelper::uuid();
                        basicConsumeRequest req;
                        req.set_rid(rid);
                        req.set_cid(_cid);
                        req.set_queue_name(queue_name);
                        req.set_consumer_tag(consumer_tag);
                        req.set_auto_ack(auto_ack);
                        _codec->send(_conn, req); // muduo库里的send是一个异步接口，不能马上返回，此时有可能还没有发送成功
                                                  // 等待服务器的响应
                        basicCommonResponsePtr resp = waitResponse(rid);
                        // 返回
                        if (resp->ok() == false)
                        {
                                DLOG("添加订阅失败！");
                                return false;
                        }

                        _consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);
                        return resp->ok();
                }

        public:
                // 连接收到消息推送后，需要通过信道找到对应的消费者对象，通过回调函数进行消息处理
                void consume(const basicConsumeResponsePtr &resp)
                {

                        if (_consumer.get() == nullptr)
                        {
                                DLOG("消息处理时，未找到订阅者信息！");
                        }
                        if (_consumer->tag != resp->consumer_tag())
                        {
                                DLOG("收到的推送消息中的消费标识，与当前信道消费者标识不一致!");
                                return;
                        }
                        _consumer->callbalk(resp->consumer_tag(), resp->mutable_properties(), resp->body());
                }
                // 连接收到基础响应后，向hashmap中添加响应
                void putBasicResponse(const basicCommonResponsePtr &resp)
                {
                        std::unique_lock<std::mutex> lock(_mutex);

                        _basic_resq.insert(std::make_pair(resp->rid(), resp));
                        _cv.notify_all();
                }

        private:
                basicCommonResponsePtr waitResponse(const std::string &rid)
                {
                        std::unique_lock<std::mutex> lock(_mutex);
                        _cv.wait(lock, [&rid, this]()
                                 { return (_basic_resq.find(rid) != _basic_resq.end()); });
                        basicCommonResponsePtr basic_resp = _basic_resq[rid];
                        _basic_resq.erase(rid);
                        return basic_resp;
                }

        private:
                std::string _cid;
                muduo::net::TcpConnectionPtr _conn;
                ProtobufCodecPtr _codec;
                Consumer::ptr _consumer;
                std::mutex _mutex;
                std::condition_variable _cv;
                std::unordered_map<std::string, basicCommonResponsePtr> _basic_resq;
        };

        class ChannelManager
        {
        public:
                using ptr = std::shared_ptr<ChannelManager>;
                ChannelManager() {}
                Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
                {
                        std::unique_lock<std::mutex> lock(_mutex);
                        auto channel = std::make_shared<Channel>(conn, codec);
                        _channels.insert(std::make_pair(channel->cid(), channel));
                        return channel;
                }
                void remove(const std::string &cid)
                {
                        std::unique_lock<std::mutex> lock(_mutex);
                        _channels.erase(cid);
                }
                Channel::ptr get(const std::string &cid)
                {
                        std::unique_lock<std::mutex> lock(_mutex);
                        auto it = _channels.find(cid);
                        if (it == _channels.end())
                        {
                                return Channel::ptr();
                        }
                        return it->second;
                }

        private:
                std::mutex _mutex;
                std::unordered_map<std::string, Channel::ptr> _channels;
        };
}

#endif